package remote

import (
	"context"
	"crypto/md5"
	"encoding/base64"
	"encoding/json"
	"fmt"
	"net/http"
	"net/url"
	"strings"
	"time"

	"github.com/go-openapi/strfmt"
	amalert "github.com/prometheus/alertmanager/api/v2/client/alert"
	amalertgroup "github.com/prometheus/alertmanager/api/v2/client/alertgroup"
	amgeneral "github.com/prometheus/alertmanager/api/v2/client/general"
	amsilence "github.com/prometheus/alertmanager/api/v2/client/silence"
	"github.com/prometheus/client_golang/prometheus"

	alertingClusterPB "github.com/grafana/alerting/cluster/clusterpb"
	alertingModels "github.com/grafana/alerting/models"
	alertingNotify "github.com/grafana/alerting/notify"

	"gopkg.in/yaml.v3"

	"github.com/grafana/grafana/pkg/infra/log"
	"github.com/grafana/grafana/pkg/infra/tracing"
	apimodels "github.com/grafana/grafana/pkg/services/ngalert/api/tooling/definitions"
	"github.com/grafana/grafana/pkg/services/ngalert/metrics"
	"github.com/grafana/grafana/pkg/services/ngalert/models"
	"github.com/grafana/grafana/pkg/services/ngalert/notifier"
	remoteClient "github.com/grafana/grafana/pkg/services/ngalert/remote/client"
	"github.com/grafana/grafana/pkg/services/ngalert/sender"
)

type stateStore interface {
	GetSilences(ctx context.Context) (string, error)
	GetNotificationLog(ctx context.Context) (string, error)
}

// AutogenFn is a function that adds auto-generated routes to a configuration.
type AutogenFn func(ctx context.Context, logger log.Logger, orgId int64, config *apimodels.PostableApiAlertingConfig, skipInvalid bool) error

// NoopAutogenFn is used to skip auto-generating routes.
func NoopAutogenFn(_ context.Context, _ log.Logger, _ int64, _ *apimodels.PostableApiAlertingConfig, _ bool) error {
	return nil
}

// DecryptFn is a function that takes in an encrypted value and returns it decrypted.
type DecryptFn func(ctx context.Context, payload []byte) ([]byte, error)

type Alertmanager struct {
	autogenFn         AutogenFn
	decrypt           DecryptFn
	defaultConfig     string
	defaultConfigHash string
	log               log.Logger
	metrics           *metrics.RemoteAlertmanager
	orgID             int64
	ready             bool
	sender            *sender.ExternalAlertmanager
	state             stateStore
	tenantID          string
	url               string

	lastConfigSync time.Time
	syncInterval   time.Duration

	amClient    *remoteClient.Alertmanager
	mimirClient remoteClient.MimirClient
}

type AlertmanagerConfig struct {
	OrgID             int64
	URL               string
	TenantID          string
	BasicAuthPassword string

	DefaultConfig string

	// ExternalURL is used in notifications sent by the remote Alertmanager.
	ExternalURL string

	// PromoteConfig is a flag that determines whether the configuration should be used in the remote Alertmanager.
	// The same flag is used for promoting state.
	PromoteConfig bool

	// StaticHeaders are used in email notifications sent by the remote Alertmanager.
	StaticHeaders map[string]string

	// SyncInterval determines how often we should attempt to synchronize configuration.
	SyncInterval time.Duration
}

func (cfg *AlertmanagerConfig) Validate() error {
	if cfg.OrgID == 0 {
		return fmt.Errorf("orgID for remote Alertmanager not set")
	}

	if cfg.TenantID == "" {
		return fmt.Errorf("empty remote Alertmanager tenantID")
	}

	if cfg.URL == "" {
		return fmt.Errorf("empty remote Alertmanager URL for tenant '%s'", cfg.TenantID)
	}
	return nil
}

func NewAlertmanager(cfg AlertmanagerConfig, store stateStore, decryptFn DecryptFn, autogenFn AutogenFn, metrics *metrics.RemoteAlertmanager, tracer tracing.Tracer) (*Alertmanager, error) {
	if err := cfg.Validate(); err != nil {
		return nil, err
	}

	u, err := url.Parse(cfg.URL)
	if err != nil {
		return nil, fmt.Errorf("unable to parse remote Alertmanager URL: %w", err)
	}
	logger := log.New("ngalert.remote.alertmanager")

	mcCfg := &remoteClient.Config{
		Logger:        logger,
		Password:      cfg.BasicAuthPassword,
		TenantID:      cfg.TenantID,
		URL:           u,
		PromoteConfig: cfg.PromoteConfig,
		ExternalURL:   cfg.ExternalURL,
		StaticHeaders: cfg.StaticHeaders,
	}
	mc, err := remoteClient.New(mcCfg, metrics, tracer)
	if err != nil {
		return nil, err
	}

	amcCfg := &remoteClient.AlertmanagerConfig{
		URL:      u,
		TenantID: cfg.TenantID,
		Password: cfg.BasicAuthPassword,
		Logger:   logger,
	}
	amc, err := remoteClient.NewAlertmanager(amcCfg, metrics, tracer)
	if err != nil {
		return nil, err
	}

	// Configure and start the components that sends alerts.
	c := amc.GetAuthedClient()
	doFunc := func(ctx context.Context, _ *http.Client, req *http.Request) (*http.Response, error) {
		return c.Do(req.WithContext(ctx))
	}
	senderLogger := log.New("ngalert.sender.external-alertmanager")
	s, err := sender.NewExternalAlertmanagerSender(
		senderLogger,
		prometheus.NewRegistry(),
		sender.WithDoFunc(doFunc),
		sender.WithUTF8Labels(),
	)
	if err != nil {
		return nil, err
	}
	s.Run()
	err = s.ApplyConfig(cfg.OrgID, 0, []sender.ExternalAMcfg{{URL: cfg.URL + "/alertmanager"}})
	if err != nil {
		return nil, err
	}

	// Parse the default configuration into a postable config.
	pCfg, err := notifier.Load([]byte(cfg.DefaultConfig))
	if err != nil {
		return nil, err
	}

	rawCfg, err := json.Marshal(pCfg)
	if err != nil {
		return nil, err
	}

	// Initialize LastReadinessCheck so it's present even if the check fails.
	metrics.LastReadinessCheck.Set(0)

	return &Alertmanager{
		amClient:          amc,
		autogenFn:         autogenFn,
		decrypt:           decryptFn,
		defaultConfig:     string(rawCfg),
		defaultConfigHash: fmt.Sprintf("%x", md5.Sum(rawCfg)),
		log:               logger,
		metrics:           metrics,
		mimirClient:       mc,
		orgID:             cfg.OrgID,
		state:             store,
		sender:            s,
		syncInterval:      cfg.SyncInterval,
		tenantID:          cfg.TenantID,
		url:               cfg.URL,
	}, nil
}

// ApplyConfig is called by the multi-org Alertmanager on startup and on every sync loop iteration (1m default).
// We do two things on startup:
// 1. Execute a readiness check to make sure the remote Alertmanager we're about to communicate with is up and ready.
// 2. Upload the configuration and state we currently hold.
// On each subsequent call to ApplyConfig we compare and upload only the configuration.
func (am *Alertmanager) ApplyConfig(ctx context.Context, config *models.AlertConfiguration) error {
	if am.ready {
		am.log.Debug("Alertmanager previously marked as ready, skipping readiness check and state sync")
	} else {
		am.log.Debug("Start readiness check for remote Alertmanager", "url", am.url)
		if err := am.checkReadiness(ctx); err != nil {
			return fmt.Errorf("unable to pass the readiness check: %w", err)
		}
		am.log.Debug("Completed readiness check for remote Alertmanager, starting state upload", "url", am.url)

		if err := am.CompareAndSendState(ctx); err != nil {
			return fmt.Errorf("unable to upload the state to the remote Alertmanager: %w", err)
		}
		am.log.Debug("Completed state upload to remote Alertmanager", "url", am.url)
	}

	if time.Since(am.lastConfigSync) < am.syncInterval {
		am.log.Debug("Not syncing configuration to remote Alertmanager, last sync was too recent")
		return nil
	}

	am.log.Debug("Start configuration upload to remote Alertmanager", "url", am.url)
	if err := am.CompareAndSendConfiguration(ctx, config); err != nil {
		return fmt.Errorf("unable to upload the configuration to the remote Alertmanager: %w", err)
	}
	am.log.Debug("Completed configuration upload to remote Alertmanager", "url", am.url)
	return nil
}

func (am *Alertmanager) checkReadiness(ctx context.Context) error {
	err := am.amClient.IsReadyWithBackoff(ctx)
	if err != nil {
		return err
	}

	am.log.Debug("Alertmanager readiness check successful")
	am.metrics.LastReadinessCheck.SetToCurrentTime()
	am.ready = true
	return nil
}

// CompareAndSendConfiguration checks whether a given configuration is being used by the remote Alertmanager.
// If not, it sends the configuration to the remote Alertmanager.
func (am *Alertmanager) CompareAndSendConfiguration(ctx context.Context, config *models.AlertConfiguration) error {
	c, err := notifier.Load([]byte(config.AlertmanagerConfiguration))
	if err != nil {
		return err
	}

	// Add auto-generated routes and decrypt before comparing.
	if err := am.autogenFn(ctx, am.log, am.orgID, &c.AlertmanagerConfig, true); err != nil {
		return err
	}
	rawDecrypted, configHash, err := am.decryptConfiguration(ctx, c)
	if err != nil {
		return err
	}

	// Send the configuration only if we need to.
	if !am.shouldSendConfig(ctx, configHash) {
		return nil
	}

	isDefault, err := am.isDefaultConfiguration(configHash)
	if err != nil {
		return err
	}

	decrypted, err := notifier.Load(rawDecrypted)
	if err != nil {
		return err
	}

	return am.sendConfiguration(ctx, decrypted, config.ConfigurationHash, config.CreatedAt, isDefault)
}

func (am *Alertmanager) isDefaultConfiguration(configHash [16]byte) (bool, error) {
	return fmt.Sprintf("%x", configHash) == am.defaultConfigHash, nil
}

// decryptConfiguration decrypts the configuration in-place and returns the decrypted configuration alongside its hash.
// Should not be used outside of this package and the specific use case of decrypting the configuration before sending
// it to the remote Alertmanager.
func (am *Alertmanager) decryptConfiguration(ctx context.Context, cfg *apimodels.PostableUserConfig) ([]byte, [16]byte, error) {
	fn := func(payload []byte) ([]byte, error) {
		return am.decrypt(ctx, payload)
	}

	// Iterate through receivers and decrypt secure settings.
	// It's not necessary to be careful about not modifying the original, as it's used only in a specific context where
	// the config is read from json and then immediately sent to the remote Alertmanager.
	for _, rcv := range cfg.AlertmanagerConfig.Receivers {
		for _, gmr := range rcv.GrafanaManagedReceivers {
			decrypted, err := gmr.DecryptSecureSettings(fn)
			if err != nil {
				return nil, [16]byte{}, fmt.Errorf("unable to decrypt settings on receiver %q (uid: %q): %w", gmr.Name, gmr.UID, err)
			}
			gmr.SecureSettings = decrypted
		}
	}

	rawDecrypted, err := json.Marshal(cfg)
	if err != nil {
		return nil, [16]byte{}, fmt.Errorf("unable to marshal decrypted configuration: %w", err)
	}

	return rawDecrypted, md5.Sum(rawDecrypted), nil
}

func (am *Alertmanager) sendConfiguration(ctx context.Context, decrypted *apimodels.PostableUserConfig, hash string, createdAt int64, isDefault bool) error {
	am.metrics.ConfigSyncsTotal.Inc()
	if err := am.mimirClient.CreateGrafanaAlertmanagerConfig(
		ctx,
		decrypted,
		hash,
		createdAt,
		isDefault,
	); err != nil {
		am.metrics.ConfigSyncErrorsTotal.Inc()
		return err
	}
	am.metrics.LastConfigSync.SetToCurrentTime()
	am.lastConfigSync = time.Now()
	return nil
}

// CompareAndSendState gets the Alertmanager's internal state and compares it with the remote Alertmanager's one.
// If the states are different, it updates the remote Alertmanager's state with that of the internal Alertmanager.
func (am *Alertmanager) CompareAndSendState(ctx context.Context) error {
	state, err := am.getFullState(ctx)
	if err != nil {
		return err
	}

	if am.shouldSendState(ctx, state) {
		am.metrics.StateSyncsTotal.Inc()
		if err := am.mimirClient.CreateGrafanaAlertmanagerState(ctx, state); err != nil {
			am.metrics.StateSyncErrorsTotal.Inc()
			return err
		}
		am.metrics.LastStateSync.SetToCurrentTime()
	}
	return nil
}

// SaveAndApplyConfig decrypts and sends a configuration to the remote Alertmanager.
func (am *Alertmanager) SaveAndApplyConfig(ctx context.Context, cfg *apimodels.PostableUserConfig) error {
	// Get the hash for the encrypted configuration.
	rawCfg, err := json.Marshal(cfg)
	if err != nil {
		return err
	}
	hash := fmt.Sprintf("%x", md5.Sum(rawCfg))

	// Add auto-generated routes and decrypt before sending.
	if err := am.autogenFn(ctx, am.log, am.orgID, &cfg.AlertmanagerConfig, false); err != nil {
		return err
	}
	rawDecrypted, _, err := am.decryptConfiguration(ctx, cfg)
	if err != nil {
		return err
	}

	decrypted, err := notifier.Load(rawDecrypted)
	if err != nil {
		return err
	}

	return am.sendConfiguration(ctx, decrypted, hash, time.Now().Unix(), false)
}

// SaveAndApplyDefaultConfig sends the default Grafana Alertmanager configuration to the remote Alertmanager.
func (am *Alertmanager) SaveAndApplyDefaultConfig(ctx context.Context) error {
	c, err := notifier.Load([]byte(am.defaultConfig))
	if err != nil {
		return fmt.Errorf("unable to parse the default configuration: %w", err)
	}

	// Add auto-generated routes and decrypt before sending.
	if err := am.autogenFn(ctx, am.log, am.orgID, &c.AlertmanagerConfig, true); err != nil {
		return err
	}
	rawDecrypted, _, err := am.decryptConfiguration(ctx, c)
	if err != nil {
		return err
	}

	decrypted, err := notifier.Load(rawDecrypted)
	if err != nil {
		return err
	}

	return am.sendConfiguration(
		ctx,
		decrypted,
		am.defaultConfigHash,
		time.Now().Unix(),
		true,
	)
}

func (am *Alertmanager) CreateSilence(ctx context.Context, silence *apimodels.PostableSilence) (string, error) {
	defer func() {
		if r := recover(); r != nil {
			am.log.Error("Panic while creating silence", "err", r)
		}
	}()

	params := amsilence.NewPostSilencesParamsWithContext(ctx).WithSilence(silence)
	res, err := am.amClient.Silence.PostSilences(params)
	if err != nil {
		return "", err
	}

	return res.Payload.SilenceID, nil
}

func (am *Alertmanager) DeleteSilence(ctx context.Context, silenceID string) error {
	defer func() {
		if r := recover(); r != nil {
			am.log.Error("Panic while deleting silence", "err", r)
		}
	}()

	params := amsilence.NewDeleteSilenceParamsWithContext(ctx).WithSilenceID(strfmt.UUID(silenceID))
	_, err := am.amClient.Silence.DeleteSilence(params)
	if err != nil {
		return err
	}
	return nil
}

func (am *Alertmanager) GetSilence(ctx context.Context, silenceID string) (apimodels.GettableSilence, error) {
	defer func() {
		if r := recover(); r != nil {
			am.log.Error("Panic while getting silence", "err", r)
		}
	}()

	params := amsilence.NewGetSilenceParamsWithContext(ctx).WithSilenceID(strfmt.UUID(silenceID))
	res, err := am.amClient.Silence.GetSilence(params)
	if err != nil {
		return apimodels.GettableSilence{}, err
	}

	return *res.Payload, nil
}

func (am *Alertmanager) ListSilences(ctx context.Context, filter []string) (apimodels.GettableSilences, error) {
	defer func() {
		if r := recover(); r != nil {
			am.log.Error("Panic while listing silences", "err", r)
		}
	}()

	params := amsilence.NewGetSilencesParamsWithContext(ctx).WithFilter(filter)
	res, err := am.amClient.Silence.GetSilences(params)
	if err != nil {
		return apimodels.GettableSilences{}, err
	}

	return res.Payload, nil
}

func (am *Alertmanager) GetAlerts(ctx context.Context, active, silenced, inhibited bool, filter []string, receiver string) (apimodels.GettableAlerts, error) {
	defer func() {
		if r := recover(); r != nil {
			am.log.Error("Panic while getting alerts", "err", r)
		}
	}()

	params := amalert.NewGetAlertsParamsWithContext(ctx).
		WithActive(&active).
		WithSilenced(&silenced).
		WithInhibited(&inhibited).
		WithFilter(filter).
		WithReceiver(&receiver)

	res, err := am.amClient.Alert.GetAlerts(params)
	if err != nil {
		return apimodels.GettableAlerts{}, err
	}

	return res.Payload, nil
}

func (am *Alertmanager) GetAlertGroups(ctx context.Context, active, silenced, inhibited bool, filter []string, receiver string) (apimodels.AlertGroups, error) {
	defer func() {
		if r := recover(); r != nil {
			am.log.Error("Panic while getting alert groups", "err", r)
		}
	}()

	params := amalertgroup.NewGetAlertGroupsParamsWithContext(ctx).
		WithActive(&active).
		WithSilenced(&silenced).
		WithInhibited(&inhibited).
		WithFilter(filter).
		WithReceiver(&receiver)

	res, err := am.amClient.Alertgroup.GetAlertGroups(params)
	if err != nil {
		return apimodels.AlertGroups{}, err
	}

	return res.Payload, nil
}

func (am *Alertmanager) PutAlerts(ctx context.Context, alerts apimodels.PostableAlerts) error {
	for _, a := range alerts.PostableAlerts {
		for k, v := range a.Labels {
			// The Grafana Alertmanager skips empty and namespace UID labels.
			// To get the same alert fingerprint we need to remove these labels too.
			// https://github.com/grafana/alerting/blob/2dda1c67ec02625ac9fc8607157b3d5825d47919/notify/grafana_alertmanager.go#L722-L724
			if len(v) == 0 || k == alertingModels.NamespaceUIDLabel {
				delete(a.Labels, k)
			}
		}
	}
	am.log.Debug("Sending alerts to a remote alertmanager", "url", am.url, "alerts", len(alerts.PostableAlerts))
	am.sender.SendAlerts(alerts)
	return nil
}

// GetStatus retrieves the remote Alertmanager configuration.
func (am *Alertmanager) GetStatus(ctx context.Context) (apimodels.GettableStatus, error) {
	defer func() {
		if r := recover(); r != nil {
			am.log.Error("Panic while getting status", "err", r)
		}
	}()

	params := amgeneral.NewGetStatusParamsWithContext(ctx)
	res, err := am.amClient.General.GetStatus(params)
	if err != nil {
		return apimodels.GettableStatus{}, err
	}

	var cfg apimodels.PostableApiAlertingConfig
	if err := yaml.Unmarshal([]byte(*res.Payload.Config.Original), &cfg); err != nil {
		return apimodels.GettableStatus{}, err
	}

	return *apimodels.NewGettableStatus(&cfg), nil
}

func (am *Alertmanager) GetReceivers(ctx context.Context) ([]apimodels.Receiver, error) {
	return am.mimirClient.GetReceivers(ctx)
}

func (am *Alertmanager) TestReceivers(ctx context.Context, c apimodels.TestReceiversConfigBodyParams) (*alertingNotify.TestReceiversResult, int, error) {
	fn := func(payload []byte) ([]byte, error) {
		return am.decrypt(ctx, payload)
	}

	receivers := make([]*alertingNotify.APIReceiver, 0, len(c.Receivers))
	for _, r := range c.Receivers {
		integrations := make([]*alertingNotify.GrafanaIntegrationConfig, 0, len(r.GrafanaManagedReceivers))
		for _, gr := range r.GrafanaManagedReceivers {
			decrypted, err := gr.DecryptSecureSettings(fn)
			if err != nil {
				return nil, 0, err
			}

			integrations = append(integrations, &alertingNotify.GrafanaIntegrationConfig{
				UID:                   gr.UID,
				Name:                  gr.Name,
				Type:                  gr.Type,
				DisableResolveMessage: gr.DisableResolveMessage,
				Settings:              json.RawMessage(gr.Settings),
				SecureSettings:        decrypted,
			})
		}
		receivers = append(receivers, &alertingNotify.APIReceiver{
			ConfigReceiver: r.Receiver,
			GrafanaIntegrations: alertingNotify.GrafanaIntegrations{
				Integrations: integrations,
			},
		})
	}
	var alert *alertingNotify.TestReceiversConfigAlertParams
	if c.Alert != nil {
		alert = &alertingNotify.TestReceiversConfigAlertParams{Annotations: c.Alert.Annotations, Labels: c.Alert.Labels}
	}

	return am.mimirClient.TestReceivers(ctx, alertingNotify.TestReceiversConfigBodyParams{
		Alert:     alert,
		Receivers: receivers,
	})
}

func (am *Alertmanager) TestTemplate(ctx context.Context, c apimodels.TestTemplatesConfigBodyParams) (*notifier.TestTemplatesResults, error) {
	for _, alert := range c.Alerts {
		notifier.AddDefaultLabelsAndAnnotations(alert)
	}

	return am.mimirClient.TestTemplate(ctx, alertingNotify.TestTemplatesConfigBodyParams{
		Alerts:   c.Alerts,
		Template: c.Template,
		Name:     c.Name,
	})
}

// StopAndWait is called when the grafana server is instructed to shut down or an org is deleted.
// In the context of a "remote Alertmanager" it is a good heuristic for Grafana is about to shut down or we no longer need you.
func (am *Alertmanager) StopAndWait() {
	am.sender.Stop()
}

func (am *Alertmanager) Ready() bool {
	return am.ready
}

// SilenceState returns the Alertmanager's silence state as a SilenceState. Currently, does not retrieve the state
// remotely and instead uses the value from the state store.
func (am *Alertmanager) SilenceState(ctx context.Context) (alertingNotify.SilenceState, error) {
	silences, err := am.state.GetSilences(ctx)
	if err != nil {
		return nil, fmt.Errorf("error getting silences: %w", err)
	}

	return alertingNotify.DecodeState(strings.NewReader(silences))
}

// getFullState returns a base64-encoded protobuf message representing the Alertmanager's internal state.
func (am *Alertmanager) getFullState(ctx context.Context) (string, error) {
	var parts []alertingClusterPB.Part

	state, err := am.SilenceState(ctx)
	if err != nil {
		return "", fmt.Errorf("error getting silences: %w", err)
	}
	b, err := state.MarshalBinary()
	if err != nil {
		return "", fmt.Errorf("error marshalling silences: %w", err)
	}
	parts = append(parts, alertingClusterPB.Part{Key: notifier.SilencesFilename, Data: b})

	notificationLog, err := am.state.GetNotificationLog(ctx)
	if err != nil {
		return "", fmt.Errorf("error getting notification log: %w", err)
	}
	parts = append(parts, alertingClusterPB.Part{Key: notifier.NotificationLogFilename, Data: []byte(notificationLog)})

	fs := alertingClusterPB.FullState{
		Parts: parts,
	}
	b, err = fs.Marshal()
	if err != nil {
		return "", fmt.Errorf("error marshaling full state: %w", err)
	}

	return base64.StdEncoding.EncodeToString(b), nil
}

// shouldSendConfig compares the remote Alertmanager configuration with our local one.
// It returns true if the configurations are different.
func (am *Alertmanager) shouldSendConfig(ctx context.Context, hash [16]byte) bool {
	rc, err := am.mimirClient.GetGrafanaAlertmanagerConfig(ctx)
	if err != nil {
		// Log the error and return true so we try to upload our config anyway.
		am.log.Warn("Unable to get the remote Alertmanager configuration for comparison, sending the configuration without comparing", "err", err)
		return true
	}

	if rc.Promoted != am.mimirClient.ShouldPromoteConfig() {
		return true
	}

	rawRemote, err := json.Marshal(rc.GrafanaAlertmanagerConfig)
	if err != nil {
		am.log.Error("Unable to marshal the remote Alertmanager configuration for comparison", "err", err)
		return true
	}
	return md5.Sum(rawRemote) != hash
}

// shouldSendState compares the remote Alertmanager state with our local one.
// It returns true if the states are different.
func (am *Alertmanager) shouldSendState(ctx context.Context, state string) bool {
	rs, err := am.mimirClient.GetGrafanaAlertmanagerState(ctx)
	if err != nil {
		// Log the error and return true so we try to upload our state anyway.
		am.log.Warn("Unable to get the remote Alertmanager state for comparison, sending the state without comparing", "err", err)
		return true
	}

	return rs.State != state
}
